C++11 并发学习(四)


学习自大神博客

多线程下生产者消费者模型

下面我们来看一下C++11多线程下生产者消费者模型

单生产者-单消费者模型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
#include <unistd.h>
#include <cstdlib>
#include <condition_variable>
#include <iostream>
#include <mutex>
#include <thread>
static const int kItemRepositorySize = 10; //Item buffer size
static const int kItemsToProduce = 100; //How many items we plan to produce
struct ItemRepository {
int item_buffer[kItemRepositorySize]; //产品缓冲区, 配合模拟环形队列
size_t read_position; //消费者读取产品位置
size_t write_position; //生产者写入产品位置
std::mutex mtx; //互斥量,保护产品缓冲区
std::condition_variable repo_not_full; //条件变量,指示产品缓冲区不为满
std::condition_variable repo_not_empty; //条件变量,指示产品缓冲区不为空
} gItemRepository; //产品库全局变量,生产者和消费者操作该变量
typedef struct ItemRepository ItemRepository;
void ProduceItem(ItemRepository *ir, int item) {
std::unique_lock<std::mutex> lock(ir->mtx);
while (((ir->write_position + 1) % kItemRepositorySize) == ir->read_position) {
//buffer is full
std::cout << "Producer is waiting for an empty slot...\n";
(ir->repo_not_full).wait(lock); //生产者等待"产品库缓冲区不为满"这一条件发生
}
(ir->item_buffer)[ir->write_position] = item; //写入产品
(ir->write_position)++; //写入位置后移
if (ir->write_position == kItemRepositorySize) {
//写入位置如果在队列最后 则需要重新设置为初始位置
ir->write_position = 0;
}
(ir->repo_not_empty).notify_all(); //通知消费者产品库不为空
lock.unlock();
}
int ConsumeItem(ItemRepository *ir) {
int data;
std::unique_lock<std::mutex> lock(ir->mtx);
while (ir->write_position == ir->read_position) {
std::cout << "Consumer is waiting for items...\n";
(ir->repo_not_empty).wait(lock); // 消费者等待"产品库缓冲区不为空"这一条件发生
}
data = (ir->item_buffer)[ir->read_position]; //读取产品
(ir->read_position)++; //读取位置后移
if (ir->read_position >= kItemRepositorySize) {
//读取位置如果在队列最后 则需要重新设置为初始位置
ir->read_position = 0;
}
(ir->repo_not_full).notify_all();
lock.unlock();
return data;
}
//生产者任务
void ProducerTask() {
for (int i = 1; i < kItemsToProduce; ++i) {
//sleep(1);
std::cout << "Produce the" << i << "^th item..." << std::endl;
ProduceItem(&gItemRepository, i); //循环生产 kItemsToProduce 个产品
}
}
//消费者任务
void ConsumerTask() {
static int cnt = 0;
while(1) {
sleep(1);
int item = ConsumeItem(&gItemRepository); //消费一个产品
std::cout << "Consume the" << item << "^th item" << std::endl;
if (++cnt == kItemsToProduce) {
//如果产品消费个数为 kItemsToProduce, 则退出.
break;
}
}
}
void InitItemRepository(ItemRepository *ir) {
ir->write_position = 0;
ir->read_position = 0;
}
int main() {
InitItemRepository(&gItemRepository);
std::thread producer(ProducerTask);
std::thread consumer(ConsumerTask);
producer.join();
consumer.join();
}

运行结果:


单生产者-多消费者模型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
//与单生产者和单消费者模型不同的是,
//单生产者-多消费者模型中可以允许多个消费者同时从产品库中取走产品。
//所以除了保护产品库在多个读写线程下互斥之外,还需要维护消费者取走产品的计数器
#include <unistd.h>
#include <cstdlib>
#include <condition_variable>
#include <iostream>
#include <mutex>
#include <thread>
static const int kItemRepositorySize = 10; //Item buffer size
static const int kItemsToProduce = 100; //How many items we plan to produce
struct ItemRepository {
int item_buffer[kItemRepositorySize]; //产品缓冲区, 配合模拟环形队列
size_t read_position; //消费者读取产品位置
size_t write_position; //生产者写入产品位置
size_t item_counter;
std::mutex item_counter_mtx;
std::mutex mtx; //互斥量,保护产品缓冲区
std::condition_variable repo_not_full; //条件变量,指示产品缓冲区不为满
std::condition_variable repo_not_empty; //条件变量,指示产品缓冲区不为空
} gItemRepository; //产品库全局变量,生产者和消费者操作该变量
typedef struct ItemRepository ItemRepository;
void ProduceItem(ItemRepository *ir, int item) {
std::unique_lock<std::mutex> lock(ir->mtx);
while (((ir->write_position + 1) % kItemRepositorySize) == ir->read_position) {
//buffer is full
std::cout << "Producer is waiting for an empty slot...\n";
(ir->repo_not_full).wait(lock); //生产者等待"产品库缓冲区不为满"这一条件发生
}
(ir->item_buffer)[ir->write_position] = item; //写入产品
(ir->write_position)++; //写入位置后移
if (ir->write_position == kItemRepositorySize) {
//写入位置如果在队列最后 则需要重新设置为初始位置
ir->write_position = 0;
}
(ir->repo_not_empty).notify_all(); //通知消费者产品库不为空
lock.unlock();
}
int ConsumeItem(ItemRepository *ir) {
int data;
std::unique_lock<std::mutex> lock(ir->mtx);
while (ir->write_position == ir->read_position) {
std::cout << "Consumer is waiting for items...\n";
(ir->repo_not_empty).wait(lock); // 消费者等待"产品库缓冲区不为空"这一条件发生
}
data = (ir->item_buffer)[ir->read_position]; //读取产品
(ir->read_position)++; //读取位置后移
if (ir->read_position >= kItemRepositorySize) {
//读取位置如果在队列最后 则需要重新设置为初始位置
ir->read_position = 0;
}
(ir->repo_not_full).notify_all();
lock.unlock();
return data;
}
//生产者任务
void ProducerTask() {
for (int i = 1; i < kItemsToProduce; ++i) {
//sleep(1);
std::cout << "Producer thread" << std::this_thread::get_id() << "Produce the" << i << "^th item..." << std::endl;
ProduceItem(&gItemRepository, i); //循环生产 kItemsToProduce 个产品
}
std::cout << "Producer thread" << std::this_thread::get_id() << "is exiting..." << std::endl;
}
//消费者任务
void ConsumerTask() {
bool ready_to_exit = false;
while(1) {
sleep(1);
std::unique_lock<std::mutex> lock(gItemRepository.item_counter_mtx);
if (gItemRepository.item_counter < kItemsToProduce) {
int item = ConsumeItem(&gItemRepository);
++(gItemRepository.item_counter);
std::cout << "Consumer thread" << std::this_thread::get_id() << "is consuming the" << item << "^th item"<< std::endl;
} else ready_to_exit = true;
lock.unlock();
if (ready_to_exit == true) {
break;
}
}
std::cout << "Consumer thread" << std::this_thread::get_id() << "is exiting..." << std::endl;
}
void InitItemRepository(ItemRepository *ir) {
ir->write_position = 0;
ir->read_position = 0;
ir->item_counter = 0;
}
int main() {
InitItemRepository(&gItemRepository);
std::thread producer(ProducerTask);
std::thread consumer1(ConsumerTask);
std::thread consumer2(ConsumerTask);
std::thread consumer3(ConsumerTask);
std::thread consumer4(ConsumerTask);
producer.join();
consumer1.join();
consumer2.join();
consumer3.join();
consumer4.join();
}

运行结果:


多生产者-单消费者模型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
//与单生产者和单消费者模型不同的是,
//单生产者-多消费者模型中可以允许多个消费者同时从产品库中取走产品。
//所以除了保护产品库在多个读写线程下互斥之外,还需要维护生产者放入产品的计数器
#include <unistd.h>
#include <cstdlib>
#include <condition_variable>
#include <iostream>
#include <mutex>
#include <thread>
static const int kItemRepositorySize = 10; //Item buffer size
static const int kItemsToProduce = 100; //How many items we plan to produce
struct ItemRepository {
int item_buffer[kItemRepositorySize]; //产品缓冲区, 配合模拟环形队列
size_t read_position; //消费者读取产品位置
size_t write_position; //生产者写入产品位置
size_t item_counter;
std::mutex item_counter_mtx;
std::mutex mtx; //互斥量,保护产品缓冲区
std::condition_variable repo_not_full; //条件变量,指示产品缓冲区不为满
std::condition_variable repo_not_empty; //条件变量,指示产品缓冲区不为空
} gItemRepository; //产品库全局变量,生产者和消费者操作该变量
typedef struct ItemRepository ItemRepository;
void ProduceItem(ItemRepository *ir, int item) {
std::unique_lock<std::mutex> lock(ir->mtx);
while (((ir->write_position + 1) % kItemRepositorySize) == ir->read_position) {
//buffer is full
std::cout << "Producer is waiting for an empty slot...\n";
(ir->repo_not_full).wait(lock); //生产者等待"产品库缓冲区不为满"这一条件发生
}
(ir->item_buffer)[ir->write_position] = item; //写入产品
(ir->write_position)++; //写入位置后移
if (ir->write_position == kItemRepositorySize) {
//写入位置如果在队列最后 则需要重新设置为初始位置
ir->write_position = 0;
}
(ir->repo_not_empty).notify_all(); //通知消费者产品库不为空
lock.unlock();
}
int ConsumeItem(ItemRepository *ir) {
int data;
std::unique_lock<std::mutex> lock(ir->mtx);
while (ir->write_position == ir->read_position) {
std::cout << "Consumer is waiting for items...\n";
(ir->repo_not_empty).wait(lock); // 消费者等待"产品库缓冲区不为空"这一条件发生
}
data = (ir->item_buffer)[ir->read_position]; //读取产品
(ir->read_position)++; //读取位置后移
if (ir->read_position >= kItemRepositorySize) {
//读取位置如果在队列最后 则需要重新设置为初始位置
ir->read_position = 0;
}
(ir->repo_not_full).notify_all();
lock.unlock();
return data;
}
//生产者任务
void ProducerTask() {
bool ready_to_exit = false;
while (1) {
sleep(1);
std::unique_lock<std::mutex> lock(gItemRepository.item_counter_mtx);
if (gItemRepository.item_counter < kItemsToProduce) {
++(gItemRepository.item_counter);
ProduceItem(&gItemRepository, gItemRepository.item_counter);
std::cout << "Producer thread" << std::this_thread::get_id() << "is producing the" << gItemRepository.item_counter << "^th item..." << std::endl;
} else ready_to_exit = true;
lock.unlock();
if (ready_to_exit == true) {
break;
}
}
std::cout << "Producer thread" << std::this_thread::get_id() << "is exiting..." << std::endl;
}
//消费者任务
void ConsumerTask() {
static int item_consumed = 0;
while (1) {
sleep(1);
++item_consumed;
if (item_consumed <= kItemsToProduce) {
int item = ConsumeItem(&gItemRepository);
std::cout << "Consumer thread" << std::this_thread::get_id() << "is consuming the" << item << "^th item"<< std::endl;
} else break;
}
std::cout << "Consumer thread" << std::this_thread::get_id() << "is exiting..." << std::endl;
}
void InitItemRepository(ItemRepository *ir) {
ir->write_position = 0;
ir->read_position = 0;
ir->item_counter = 0;
}
int main() {
InitItemRepository(&gItemRepository);
std::thread producer1(ProducerTask);
std::thread producer2(ProducerTask);
std::thread producer3(ProducerTask);
std::thread producer4(ProducerTask);
std::thread consumer(ConsumerTask);
producer1.join();
producer2.join();
producer3.join();
producer4.join();
consumer.join();
}

运行结果:


多生产者-多消费者模型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
//该模型可以说是前面两种模型的综合,程序需要维护两个计数器,
//分别是生产者已生产产品的数目和消费者已取走产品的数目。
//另外也需要保护产品库在多个生产者和多个消费者互斥地访问
#include <unistd.h>
#include <cstdlib>
#include <condition_variable>
#include <iostream>
#include <mutex>
#include <thread>
static const int kItemRepositorySize = 10; //Item buffer size
static const int kItemsToProduce = 100; //How many items we plan to produce
struct ItemRepository {
int item_buffer[kItemRepositorySize]; //产品缓冲区, 配合模拟环形队列
size_t read_position; //消费者读取产品位置
size_t write_position; //生产者写入产品位置
size_t produced_item_counter;
size_t consumed_item_counter;
std::mutex produced_item_counter_mtx;
std::mutex consumed_item_counter_mtx;
std::mutex mtx; //互斥量,保护产品缓冲区
std::condition_variable repo_not_full; //条件变量,指示产品缓冲区不为满
std::condition_variable repo_not_empty; //条件变量,指示产品缓冲区不为空
} gItemRepository; //产品库全局变量,生产者和消费者操作该变量
typedef struct ItemRepository ItemRepository;
void ProduceItem(ItemRepository *ir, int item) {
std::unique_lock<std::mutex> lock(ir->mtx);
while (((ir->write_position + 1) % kItemRepositorySize) == ir->read_position) {
//buffer is full
std::cout << "Producer is waiting for an empty slot...\n";
(ir->repo_not_full).wait(lock); //生产者等待"产品库缓冲区不为满"这一条件发生
}
(ir->item_buffer)[ir->write_position] = item; //写入产品
(ir->write_position)++; //写入位置后移
if (ir->write_position == kItemRepositorySize) {
//写入位置如果在队列最后 则需要重新设置为初始位置
ir->write_position = 0;
}
(ir->repo_not_empty).notify_all(); //通知消费者产品库不为空
lock.unlock();
}
int ConsumeItem(ItemRepository *ir) {
int data;
std::unique_lock<std::mutex> lock(ir->mtx);
while (ir->write_position == ir->read_position) {
std::cout << "Consumer is waiting for items...\n";
(ir->repo_not_empty).wait(lock); // 消费者等待"产品库缓冲区不为空"这一条件发生
}
data = (ir->item_buffer)[ir->read_position]; //读取产品
(ir->read_position)++; //读取位置后移
if (ir->read_position >= kItemRepositorySize) {
//读取位置如果在队列最后 则需要重新设置为初始位置
ir->read_position = 0;
}
(ir->repo_not_full).notify_all();
lock.unlock();
return data;
}
//生产者任务
void ProducerTask() {
bool ready_to_exit = false;
while (1) {
sleep(1);
std::unique_lock<std::mutex> lock(gItemRepository.produced_item_counter_mtx);
if (gItemRepository.produced_item_counter < kItemsToProduce) {
++(gItemRepository.produced_item_counter);
ProduceItem(&gItemRepository, gItemRepository.produced_item_counter);
std::cout << "Producer thread" << std::this_thread::get_id() << "is producing the" << gItemRepository.produced_item_counter << "^th item..." << std::endl;
} else ready_to_exit = true;
lock.unlock();
if (ready_to_exit == true) {
break;
}
}
std::cout << "Producer thread" << std::this_thread::get_id() << "is exiting..." << std::endl;
}
//消费者任务
void ConsumerTask() {
bool ready_to_exit = false;
while (1) {
sleep(1);
std::unique_lock<std::mutex> lock(gItemRepository.consumed_item_counter_mtx);
if (gItemRepository.consumed_item_counter <= kItemsToProduce) {
int item = ConsumeItem(&gItemRepository);
++(gItemRepository.consumed_item_counter);
std::cout << "Consumer thread" << std::this_thread::get_id() << "is consuming the" << gItemRepository.consumed_item_counter << "^th item"<< std::endl;
} else ready_to_exit = true;
lock.unlock();
if (ready_to_exit == true) {
break;
}
}
std::cout << "Consumer thread" << std::this_thread::get_id() << "is exiting..." << std::endl;
}
void InitItemRepository(ItemRepository *ir) {
ir->write_position = 0;
ir->read_position = 0;
ir->produced_item_counter = 0;
ir->consumed_item_counter = 0;
}
int main() {
InitItemRepository(&gItemRepository);
std::thread producer1(ProducerTask);
std::thread producer2(ProducerTask);
std::thread producer3(ProducerTask);
std::thread producer4(ProducerTask);
std::thread consumer1(ConsumerTask);
std::thread consumer2(ConsumerTask);
std::thread consumer3(ConsumerTask);
std::thread consumer4(ConsumerTask);
producer1.join();
producer2.join();
producer3.join();
producer4.join();
consumer1.join();
consumer2.join();
consumer3.join();
consumer4.join();
}

运行结果: